Amazon Kinesis Data Firehose でバッファリング間隔を 0 秒に設定可能になりました

Amazon Kinesis Data Firehose でバッファリング間隔を 0 秒に設定可能になりました

ただし、Amazon Kinesis Data Firehose の主な役割は適切なバッファリングにあるため、本末転倒の感はある
Clock Icon2023.12.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX 事業本部 Delivery 部の若槻です。

Amazon Kinesis Data Firehose のアップデートで、Derivery Stream の 0 秒のバッファリング間隔がサポートされました。

今までは最小の設定可能間隔は 60 秒でしたが、動的パーティショニングなどの追加の処理が無い場合は、最短で 5 秒以内にデータが出力可能になります。

試してみた

Kinesis Data Firehose の作成

マネジメントコンソールから配信ストリームを作成します。

すると Buffer interval のメニューで 0 が設定可能になっています。

メニュー内の中期でも 0 秒から 900 秒までの間で設定可能と記載されています。

Minimum: 0 seconds, maximum: 900 seconds. Recommended: 300 seconds.

配信ストリームが作成できました。

バッファリング間隔が 0 秒に設定されています。

動作確認

バッファリング期間が 0 秒であることを確認するために、配信ストリームに連続でデータを Put してみます。下記のように put-record コマンドを 5 回連続で実行します。

aws firehose put-record --delivery-stream-name $DELIVERY_STREAM_NAME --record Data="Sample Data\n" --cli-binary-format raw-in-base64-out
aws firehose put-record --delivery-stream-name $DELIVERY_STREAM_NAME --record Data="Sample Data\n" --cli-binary-format raw-in-base64-out
aws firehose put-record --delivery-stream-name $DELIVERY_STREAM_NAME --record Data="Sample Data\n" --cli-binary-format raw-in-base64-out
aws firehose put-record --delivery-stream-name $DELIVERY_STREAM_NAME --record Data="Sample Data\n" --cli-binary-format raw-in-base64-out
aws firehose put-record --delivery-stream-name $DELIVERY_STREAM_NAME --record Data="Sample Data\n" --cli-binary-format raw-in-base64-out

すると体感的にはほぼタイムラグ無く S3 バケットにオブジェクトが作成されてました。

バッファリング期間が 0 秒のため、1 秒または 2 秒ごとにデータが出力できています。

AWS CDK では未対応

現時点での AWS CDK の最新バージョン v2.116.1 では、@aws-cdk/aws-kinesisfirehose-destinations-alpha module でバッファリング期間に 0 秒を指定することができません。

import { Construct } from 'constructs';
import {
  aws_s3,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // 出力先バケット
    const dataBucket = new aws_s3.Bucket(this, 'DestinationBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // Kinesis Firehose Delivery Stream
    new firehose_alpha.DeliveryStream(this, 'DeliveryStream', {
      destinations: [
        new firehose_destinations_alpha.S3Bucket(dataBucket, {
          dataOutputPrefix: 'data/YYYY/MM/dd/HH',
          errorOutputPrefix:
            'error/!{firehose:error-output-type}/YYYY/MM/dd/HH',
          bufferingInterval: Duration.seconds(0),
        }),
      ],
    });
  }
}

上記のように bufferingIntervalDuration.seconds(0) を指定すると、下記のようにバリデーションに弾かれてエラーが発生します。

$ cdk synth
/Users/user/projects/repo/cdk_sample_app/node_modules/@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/private/helpers.ts:91
    throw new Error(`Buffering interval must be between 60 and 900 seconds. Buffering interval provided was ${intervalInSeconds} seconds.`);
          ^
Error: Buffering interval must be between 60 and 900 seconds. Buffering interval provided was 0 seconds.
    at createBufferingHints (/Users/user/projects/repo/cdk_sample_app/node_modules/@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/private/helpers.ts:91:11)
    at S3Bucket.bind (/Users/user/projects/repo/cdk_sample_app/node_modules/@aws-cdk/aws-kinesisfirehose-destinations-alpha/lib/s3-bucket.ts:46:45)
    at new DeliveryStream (/Users/user/projects/repo/cdk_sample_app/node_modules/@aws-cdk/aws-kinesisfirehose-alpha/lib/delivery-stream.ts:381:53)
    at new CdkSampleStack (/Users/user/projects/repo/cdk_sample_app/lib/cdk-sample-stack.ts:23:5)
    at Object.<anonymous> (/Users/user/projects/repo/cdk_sample_app/bin/cdk_sample_app.ts:6:1)
    at Module._compile (node:internal/modules/cjs/loader:1241:14)
    at Module.m._compile (/Users/user/.npm/_npx/1bf7c3c15bf47d04/node_modules/ts-node/src/index.ts:1618:23)
    at Module._extensions..js (node:internal/modules/cjs/loader:1295:10)
    at Object.require.extensions.<computed> [as .ts] (/Users/user/.npm/_npx/1bf7c3c15bf47d04/node_modules/ts-node/src/index.ts:1621:12)
    at Module.load (node:internal/modules/cjs/loader:1091:32)

Subprocess exited with error 1

AWS CDK で配信ストリームを構築している場合は、L1 Construct を使用するか、今後のアップデートでサポートされるまで待つ必要があります。

バッファリング期間を 0 秒にするのは本末転倒の感はある

そもそも Amazon Kinesis Data Firehose はリアルタイムの大規模なデータストリームを効率的に処理するためのサービスです。受信したデータを配信ストリームでバッファすることで、効率的なデータ転送とコスト削減を実現しています。よってバッファ期間を 0 秒にすると出力するオブジェクト数が増えるため、出力時の動的パーティショニングや出力されたオブジェクトに対する Amazon Ahtena クエリの実行などでコストが増える可能性があります。そのため本番環境でバッファリング期間を 0 秒にするのは多くのユースケースで本末転倒となる可能性があります。AWS 公式で推奨されているバッファリング間隔は 300 秒なので、それより短くする場合はその必要性についてきちんと認識できている必要があります。

おわりに

Amazon Kinesis Data Firehose でバッファリング間隔を 0 秒に設定可能になったのでご紹介しました。

本末転倒のケースが多いとは述べましたが、Kinesis Data Firehose を利用したアーキテクチャの設計の幅が広がったことには違いありません。開発環境でのみバッファリング間隔を短くするなどすれば、Kinesis Data Firehose を使用した機能の動作確認の効率化にも役立てられると思います。便利なアップデートであることは間違いないので、適切に活用していきましょう。

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.